package com.hzw.monitor.mysqlbinlog.connection;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.hzw.monitor.mysqlbinlog.event.data.TableMapEventData;
import com.hzw.monitor.mysqlbinlog.type.ChecksumType;
import com.hzw.monitor.mysqlbinlog.utils.MyConstants;
import com.hzw.monitor.mysqlbinlog.utils.SqlUtils;
import com.hzw.monitor.mysqlbinlog.utils.StringUtils;

public class ConnectionAttributes {
	// 记录了连接远程连接的属性
	// //
	// private AtomicBoolean exit = new AtomicBoolean(false);
	//
	// public boolean isExit() {
	// return exit.get();
	// }
	//
	// public void setExit(boolean exit) {
	// this.exit.set(exit);
	// }

	public String getIp() {
		return ip;
	}

	public int getPort() {
		return port;
	}

	private static final Logger logger = LogManager.getLogger(ConnectionAttributes.class);

	private String runningZKPath; //// runningPath
	private String binlogPositionZKPath; // binlogPositionPath
	private long clientId;// clientId
	private String ip; // 主机
	private int port;
	private String usernameForReplication; // 用于复制
	private String passwordForReplication;
	private String usernameForSchema; // 用于获取schema
	private String passwordForSchema;
	private ChecksumType checksumType = ChecksumType.NONE; // checksum
	private List<Rule> rules; // filter rules
	private HashMap<String, Boolean> learnedRules = new HashMap<String, Boolean>();
	private int accumalatedBinlogPositionCount = 0;// 默认为0// 本地加速作用
	private String binlogFileName = null; // binlog
	private long binlogPosition = 4;
	// 用来处理ZK History的逻辑
	private Long currentPosition;// 本条数据的position
	private Long nextPosition;// 上条数据的的position
	private String historyPositionDay;// 上次存放position的日期: 20160126

	public Long getCurrentPosition() {
		return currentPosition;
	}

	public void setCurrentPosition(Long currentPosition) {
		this.currentPosition = currentPosition;
	}

	public Long getNextPosition() {
		return nextPosition;
	}

	public void setNextPosition(Long nextPosition) {
		this.nextPosition = nextPosition;
	}

	public String getHistoryPositionDay() {
		return historyPositionDay;
	}

	public void setHistoryPositionDay(String historyPositionDay) {
		this.historyPositionDay = historyPositionDay;
	}

	// RunningPath
	public ConnectionAttributes setRunningZKPath(String runningPath) {
		this.runningZKPath = runningPath;
		return this;
	}

	public String getRunningZKPath() {
		return runningZKPath;
	}

	// BinlogPositionZKPath
	public ConnectionAttributes setBinlogPositionZKPath(String binlogPositionPath) {
		this.binlogPositionZKPath = binlogPositionPath;
		return this;
	}

	public String getBinlogPositionZKPath() {
		return binlogPositionZKPath;
	}

	// ClientId
	public ConnectionAttributes setClientId(long clientId) {
		this.clientId = clientId;
		return this;
	}

	public long getClientId() {
		return clientId;
	}

	// IP&Port
	public ConnectionAttributes setIpPort(String ip, int port) {
		this.ip = ip;
		this.port = port;
		return this;
	}

	// username&password for replication
	public void setUsernameForReplication(String usernameForReplication) {
		this.usernameForReplication = usernameForReplication;
	}

	public String getUsernameForReplication() {
		return usernameForReplication;
	}

	public void setPasswordForReplication(String passwordForReplication) {
		this.passwordForReplication = passwordForReplication;
	}

	public String getPasswordForReplication() {
		return passwordForReplication;
	}

	// username&password for schema
	public void setUsernameForSchema(String usernameForSchema) {
		this.usernameForSchema = usernameForSchema;
	}

	public String getUsernameForSchema() {
		return usernameForSchema;
	}

	public void setPasswordForSchema(String passwordForSchema) {
		this.passwordForSchema = passwordForSchema;
	}

	public String getPasswordForSchema() {
		return passwordForSchema;
	}

	// checksum
	public void setChecksumType(ChecksumType checksumType) {
		this.checksumType = checksumType;
	}

	public ChecksumType getChecksumType() {
		return checksumType;
	}

	public long getChecksumLength() {
		return this.checksumType.getLength();
	}

	// rule
	public void setRules(List<Rule> rules) {
		this.rules = rules;
	}

	public boolean accept(String d, String t) {
		// 先看本地是否学习过
		String key = StringUtils.union(d, t);
		Boolean learnedResult = learnedRules.get(key);
		if (null != learnedResult) {
			// LoggerUtils.debug(logger, "learned regex,return learned result");
			return learnedResult.booleanValue();
		}
		// LoggerUtils.debug(logger, "no learned regex,learn from origin rules:"
		// + rules);
		// 之前没有学习过，遍历本地rules
		boolean result = false;
		if (rules.isEmpty()) {
			result = true;
		} else {
			for (Rule rule : rules) {
				if (rule.accept(d, t)) {
					result = true;
					break;
				}
			}
		}
		// 返回前，存入学习结果[无论成功还是失败,都是学习结果]
		learnedRules.put(key, result);
		return result;

	}

	// accumalatedBinlogPositionCount
	public long incrAccumalatedBinlogPositionCount() {
		accumalatedBinlogPositionCount++;
		return accumalatedBinlogPositionCount;
	}

	public void setAccumalatedBinlogPositionCount(int accumalatedBinlogPositionCount) {
		this.accumalatedBinlogPositionCount = accumalatedBinlogPositionCount;
	}

	public int getAccumalatedBinlogPositionCount() {
		return accumalatedBinlogPositionCount;
	}

	// binlogFileName
	public ConnectionAttributes updateBinlogNameAndPosition(String name, long position) {
		this.binlogFileName = name.trim();
		this.binlogPosition = position < 4 ? 4 : position;
		return this;
	}

	// public void setBinlogFileName(String binlogFileName) {
	// this.binlogFileName = binlogFileName;
	// }

	public String getBinlogFileName() {
		return binlogFileName;
	}

	// binlogFilePositon
	// public void setBinlogPosition(long binlogPosition) {
	// this.binlogPosition = binlogPosition;
	// }

	public long getBinlogPosition() {
		if (binlogPosition < 4)
			binlogPosition = 4;
		return binlogPosition;
	}

	//
	//
	// 3)tableMapEventDatas
	// 临时性的事件处理,用完了应该立刻删除,防止内存占用过多
	private HashMap<Long, TableMapEventData> tableMapEventDatas = new HashMap<Long, TableMapEventData>();

	public void putTableMapEventData(long tableId, TableMapEventData data) {
		tableMapEventDatas.put(tableId, data);
	}

	public TableMapEventData getAndRmoveTableMapEventData(long tableId) {
		// 直接顺带删除了
		return tableMapEventDatas.remove(tableId);
	}

	// 4)保留本连接对应的数据库-表-列名的关系// 处理database-table-columns映射关系
	private HashMap<String, HashMap<String, String>> databaseTableColumnsMapping = new HashMap<String, HashMap<String, String>>();

	public void ensureDatabaseTableColumnsMappingDeleted(String database, String table) {
		String key = StringUtils.union(database, table);
		this.databaseTableColumnsMapping.remove(key);
	}

	public void ensureDatabaseTableColumnsMappingExist(String database, String table, boolean forceUpdate) {
		// 因为比较耗时，所幸,并不是经常改数据表结构
		// 如果之前拉过一次，大部分情况后面不需要再重新拉取
		String key = StringUtils.union(database, table);
		if (false == forceUpdate) {// 不强制更新，有就行了
			if (null != databaseTableColumnsMapping.get(key)) {
				// 已经有了，不用做其它操作
				// LoggerUtils.debug(logger, "mappings已经存在，不用更新");
			} else {
				HashMap<String, String> mappings = SqlUtils.getDatabaseTableColumnsMapping(ip, port,
						this.usernameForSchema, this.passwordForSchema, database, table);
				// LoggerUtils.debug(logger, "非强制更新" + mappings);
				databaseTableColumnsMapping.put(key, mappings);
			}
		} else {
			// 强制更新,拉取强制更新,不管有没有，一律强制更新
			// 比如修改了表结构[这种情况也不多,没事修改表结构干嘛。。。:)]
			HashMap<String, String> mappings = SqlUtils.getDatabaseTableColumnsMapping(ip, port, usernameForSchema,
					passwordForSchema, database, table);
			// LoggerUtils.debug(logger, "强制更新" + mappings);
			databaseTableColumnsMapping.put(key, mappings);
		}
	}

	public HashMap<String, String> getColumnsMapping(String database, String table) {
		return databaseTableColumnsMapping.get(StringUtils.union(database, table));
	}

	public static ConnectionAttributes parse(String data) {
		ConnectionAttributes attributes = null;
		// 提取出各种k/v
		JSONObject jsonObject = JSON.parseObject(data);
		String ur = jsonObject.getString(MyConstants.USERNAME_FOR_REPLICATION);
		String pr = jsonObject.getString(MyConstants.PASSWORD_FOR_REPLICATION);
		String us = jsonObject.getString(MyConstants.USERNAME_FOR_SCHEMA);
		String ps = jsonObject.getString(MyConstants.PASSWORD_FOR_SCHEMA);
		ArrayList<Rule> rules = new ArrayList<Rule>();
		{
			JSONArray rulesArray = jsonObject.getJSONArray(MyConstants.FILTER_RULES);
			JSONObject rule;
			String database;
			String table;
			for (Object obj : rulesArray) {
				rule = (JSONObject) obj;
				database = rule.getString(MyConstants.DATABASE_FOR_FILTER_RULES);
				table = rule.getString(MyConstants.TABLE_FOR_FILTER_RULES);
				rules.add(new Rule(Pattern.compile(database), Pattern.compile(table)));
			}
		}
		// 开始构造对象并赋值
		attributes = new ConnectionAttributes();
		attributes.setUsernameForReplication(ur);
		attributes.setPasswordForReplication(pr);
		attributes.setUsernameForSchema(us);
		attributes.setPasswordForSchema(ps);
		attributes.setRules(rules);

		return attributes;

	}

}
